HiveMQ MQTT Client是一個 Java 客戶端庫,用於在 Java 應用程序中實現 MQTT 客戶端。它是由 HiveMQ 團隊開發的庫,幫助開發者輕鬆地集成 MQTT 通信功能到他們的 Java 應用程序中。 HiveMQ MQTT Client 可以用於創建 MQTT 客戶端,以連接到 MQTT broker並執行發布、訂閱等操作。
MQTT 是一種網絡協議,用於在客戶端和服務器之間進行消息傳遞。它是一種輕量級、發布/訂閱型協議,特別適用於物聯網(IoT)設備之間的通信。
MQTT broker 是充當消息的中間服務器,負責接收、轉發和分發 MQTT 消息。
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.0</version>
</dependency>
//創建MQTT5Client
Mqtt5Client client = Mqtt5Client.builder()
.identifier(clientId)
.serverHost(broker)
.serverPort(1883)
.build();
當執行某個阻塞操作時,程式會等待,直到該操作完成並返回結果,然後才能繼續執行其他任務。
Mqtt5BlockingClient blockingClient = Mqtt5Client.builder().buildBlocking();
在非同步編程中,任務可以提交給執行緒,然後繼續執行其他任務,當任務完成時,可以通過回調或其他機制來處理結果。
Mqtt5AsyncClient asyncClient = Mqtt5Client.builder().buildAsync();
它建立在非同步編程的基礎上,強調事件的流動和數據的響應。在反應式編程中,系統可以對數據流中的事件做出響應,執行相應的操作。
Mqtt5RxClient rxClient = Mqtt5Client.builder().buildRx();
MqttConfig.java
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConfig {
private final String topic = "test/"; // 訂閱的 MQTT 主題
// 客戶端識別ID
String clientId = "clientA";
// MQTT 伺服器位址
String broker = "broker.hivemq.com";
@Bean
public void connectMqttClient() {
//創建MQTT5Client
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier(clientId)
.serverHost(broker)
.serverPort(1883)
.buildAsync();
//連接
client.connect()
.whenComplete((connAck, throwable) -> {
if (throwable != null) {
//發生錯誤時
log.error(throwable.getMessage());
} else {
//成功連接時
log.error("連接成功");
//主動訂閱主題
client.subscribeWith()
.topicFilter(topic)
.callback(this::onMessageReceived) //將發送到主題中的訊息做處理
.send()
.whenComplete((subAck, throwable_sub) -> {
if (throwable_sub != null) {
log.error(throwable_sub.getMessage());
} else {
log.error("訂閱 " + topic + "主題 成功");
}
});
}
});
}
//處理傳來的訊息
private void onMessageReceived(Mqtt5Publish publish) {
String topic = publish.getTopic().toString();
String payload = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
log.error("Received message on topic '" + topic + "': " + payload);
}
}